/****************************************************
 * 创建人：@author fengxin    
 * 创建时间: 2019/12/31/14:57
 * 项目名称: zmq
 * 文件名称: DealerRouter.java
 * 文件描述: dealer connect to router 
 *
 * All rights Reserved, Designed By 投资交易团队
 * @Copyright:2016-2019
 *
 ********************************************************/
package com.learn.zmq.repreq.dealerrouterdealer;

import org.zeromq.*;

/**
 * 包名称：com.learn.zmq.repreq.dealerrouter
 * 类名称：DealerRouter
 * 类描述：dealer connect to router
 * 创建人：@author fengxin
 * 创建时间：2019/12/31/14:57
 */
public class DealerRouterDealer {

    private static String WorkerName = "Worker";

    private static Integer RouterPort = 5559;

    private static String RouterAddr = "tcp://localhost:"+RouterPort;

    private static String RouterBindAddr = "tcp://0.0.0.0:"+RouterPort;


    private static class DealerClient implements ZThread.IAttachedRunnable {

        String ClientName;

        public DealerClient(String clientName) {
            ClientName = clientName;
        }

        @Override
        public void run(Object[] objects, ZContext zContext, ZMQ.Socket socket) {
            ZMQ.Socket dealer = zContext.createSocket(SocketType.DEALER);
            dealer.setLinger(0);
            dealer.setHWM(0);
            dealer.setIdentity(ClientName.getBytes(ZMQ.CHARSET));
            dealer.connect(RouterAddr);


            System.out.println("DealerClient connect to:"+RouterAddr);

            int msgNo = 0;

           while (true) {
                msgNo++;

               // 发送消息
               ZMsg msg = new ZMsg();
               // 消息内容
               msg.push(msgNo+" Hello");
               ZFrame workerId =  new ZFrame(WorkerName);
               msg.push(workerId);
               msg.addFirst("");

//               msg.wrap(workerId);

               msg.send(dealer);



               // 接收消息
               ZMsg recvMsg = ZMsg.recvMsg(dealer);
               // 空帧
               ZFrame empty = recvMsg.unwrap();
               // 空帧后为真正的消息
               ZFrame recvMsgContent = recvMsg.pop();
               // 源id
               ZFrame sourceId = recvMsg.unwrap();

               recvMsgContent.print(ClientName+" revMsg from "+WorkerName+":");

               ZMQ.sleep(1);
           }
        }

    }

    private static class ReqClient implements ZThread.IAttachedRunnable {
        String ClientName;

        public ReqClient(String clientName) {
            ClientName = clientName;
        }
        @Override
        public void run(Object[] objects, ZContext zContext, ZMQ.Socket socket) {
            ZMQ.Socket dealer = zContext.createSocket(SocketType.REQ);
            dealer.setHWM(0);
            dealer.setIdentity(ClientName.getBytes(ZMQ.CHARSET));
            dealer.connect(RouterAddr);


            System.out.println("ReqClient connect to:"+RouterAddr);

            int msgNo = 0;

            while (true) {
                msgNo++;

                // 发送消息
                ZMsg msg = new ZMsg();
                // 消息内容
                msg.push(msgNo+" Hello");
                ZFrame workerId =  new ZFrame(WorkerName);
                msg.push(workerId);
//               msg.wrap(workerId);

                msg.send(dealer);

                // 接收消息
                ZMsg recvMsg = ZMsg.recvMsg(dealer);

                ZFrame recvMsgContent = recvMsg.pop();

                recvMsgContent.print(ClientName+" revMsg from "+WorkerName+":");

                recvMsg.destroy();

                ZMQ.sleep(1);
            }
        }

    }

    private static class Broker implements ZThread.IAttachedRunnable {

        public static ZMsg routerSendMsg(ZFrame sourceId,ZFrame targetId,ZFrame... contents) {
            // 组装转发消息
            ZMsg transfer = new ZMsg();
            // 源Socket Identity
            transfer.wrap(sourceId);
            // 消息内容
            for (ZFrame content : contents) {
                transfer.addFirst(content.duplicate());
            }
            // 目标Socket Identity
            transfer.wrap(targetId);

            return transfer;
        }

        @Override
        public void run(Object[] objects, ZContext zContext, ZMQ.Socket socket) {
            ZMQ.Socket router = zContext.createSocket(SocketType.ROUTER);
            router.setHWM(0);
            router.setLinger(0);
            router.setIdentity("Broker".getBytes(ZMQ.CHARSET));
            router.bind(RouterBindAddr);
            System.out.println("Broker bind to:"+RouterBindAddr);
            ZFrame empty;

            while(true) {

                // 接收消息
                ZMsg msg = ZMsg.recvMsg(router);
                ZFrame senderId = msg.pop();
                empty = msg.pop();
                ZFrame rargetId = msg.pop();
                ZFrame content = msg.pop();

                // 组装转发消息
                ZMsg transfer=routerSendMsg(senderId,rargetId,content);

                // 发送消息
                transfer.send(router);

            }



        }
    }

    private static class Worker implements ZThread.IAttachedRunnable {

        @Override
        public void run(Object[] objects, ZContext zContext, ZMQ.Socket socket) {
            ZMQ.Socket worker = zContext.createSocket(SocketType.DEALER);
            worker.setHWM(0);
            worker.setLinger(0);
            worker.setIdentity(WorkerName.getBytes(ZMQ.CHARSET));
            worker.connect(RouterAddr);
            System.out.println("Worker connect to:"+RouterAddr);


            while(true) {
                // worker接收消息
                ZMsg msg = ZMsg.recvMsg(worker);

                // 空帧
                ZFrame empty = msg.unwrap();

                // id后为真正的消息
                ZFrame msgContent = msg.pop();

                // 取出源id
                ZFrame sourceId = msg.unwrap();

                msgContent.print(WorkerName+" revMsg from "+sourceId.getString(ZMQ.CHARSET)+":");

                // worker回复消息
                ZMsg reply = new ZMsg();
                reply.push("Nice to meet you.");
                // 给reply加上返回的目标id
                reply.push(sourceId);

                reply.addFirst("");

                reply.send(worker);
            }

        }
    }




    public static void main(String[] args) {

        try (ZContext ctx = new ZContext()) {
            ZThread.fork(ctx, new Broker());
            ZThread.fork(ctx, new Worker());
            // client 使用REQ
             ZThread.fork(ctx, new ReqClient("ReqClient"));

            // client 使用Dealer
            ZThread.fork(ctx, new DealerClient("DealerClient1"));
            // client 使用Dealer
            ZThread.fork(ctx, new DealerClient("DealerClient2"));

            // 阻塞 否则会直接退出

            synchronized (ctx) {
                try {
                    ctx.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        }



    }
}
